ULTRON — 基于Flink实时数仓平台
The following article is from 数据架构那些事 Author 朱广彬
热文回顾:☞ Flink SQL实时数仓开源UI平台
正文
背景与需求
架构设计
Ultron平台化之路
未来规划
实时报表:统计报表实时产出,及时统计展示流量、花费、RPM等指标,快速决策响应
实时监控:用于快速定位问题,避免故障恶化
实时模型反馈与策略控制:在广告的优化的过程中,能够快速优化模型,优化广告效果
随着业务的发展和规模的不断扩大,这套系统已经不能满足需求,主要集中在以下几个方面:
实时数据没有数仓化。Ultron 1.0当初的设计仅仅是提供了一个Flink集群部署和作业提交的平台,并没有对数据做很好地约束和规范。实时的数据主要存储在Kafka中,还有Aerospike/Redis作为维表存储,而这些数据并没有像离线数仓一样做schema规范,没有统一的元数据管理,缺乏数据分层和模型约束。随着数据规模的不断扩大,实时作业越来越多,出现了很多烟囱式开发以及重复低效数据使用的问题。 开发复杂度高、管理成本高。Ultron 1.0只提供了JobJar方式提交作业,Flink 的 Job API相对比较复杂,用户需要对Flink有非常深入的理解,有一定的开发难度。从平台维护角度,虽然我们提供了统一的maven脚手架,但也很难强制约束,Job Jar对平台是个黑盒,其内在逻辑无法感知,甚至会出现一些使用不正确甚至滥用的方式,管理成本高。
缺乏平台和数据治理。Ultron 1.0对权限的控制比较粗糙,也没有项目规划,因此存在误操作的风险。数据层面上没有元数据统一管理,作业层面上JobJar的黑盒逻辑,因此很难做好数据血缘、任务依赖以及数据地图等治理,很容易出现数据问题,而出问题时只能从上游到下游层层排查,故障率高,效率低下。 实时离线架构不统一。离线是已经成熟的基于Hive的数仓式管理,业务用SQL来完成Hive数仓ETL开发,而实时则是Flink API开发。同一个业务既有离线数仓报表需求又有实时报表的需求,需要熟悉两套架构,而这两套架构有着很大的差异,即使有很多ETL逻辑是相同的。
随着Flink在SQL上的逐步完善,以及对周边生态的支持越来越成熟,尤其是Flink 在1.10版本之后在批流合一上的新特性,数据架构团队从去年就开始调研FlinkSQL,规划基于FlinkSQL的新数仓架构。
02 架构设计
数据源:包含引擎日志、物料DB数据以及其他第三方数据 数据采集与传输:我们约定所有源数据均要收集到Kafka消息队列,再决定走实时处理还是落地HDFS/Hive离线处理。日志会通过FileBeat/LogStash等采集Agent收集到Kafka,或者由引擎直接写出到Kafka中。对于物料DB数据会通过CDC采集,我们目前采用MaxWell。而涉及到异构数据传输的需求,比如Kafka数据同步到HDFS,我们内部研发了一款Hamal的框架,其核心也是基于Flink,用户只需要定义好Source/Sink以及相应的格式等必要的配置即可提交Job进行实时的数据传输。 存储层:实时消息数据的主要存储在Kafka消息队列;离线的文件存储在HDFS,对访问性能有要求的场景通过Alluxio加速。同时,我们还有半结构化NoSQL的AeroSpike/HBase/Redis/ES,以及结构化数据库MySQL/TiDB。注意这里只是在架构上统一归属到存储,从数据处理流程上,既可能是源数据存储,也可能是最终的结果存储。比如MySQL中的数据,既有ETL过程中作为维表的,也有作为最终报表的。 计算层:我们目前已有的资源调度系统有YARN/Mesos/K8S,对于离线Batch场景,我们选择on YARN,对于实时Streaming服务,我们更倾向于on Mesos/K8S(Mesos在商业化的应用历史很久,我们很早就在应用,随着K8S的逐步完善,后续将逐步统一到K8S)。对于实时流计算,我们目前已经统一到了Flink Streaming;而对于离线批计算,我们正在探索Flink Batch,当然还有传统的MR/Pig/Spark/Hive等。 数仓与OLAP:对于离线数仓体系,以Hive为核心,用Presto作为ROLAP引擎;而实时数仓,我们将整合Kafka/AeroSpike/HBase/MySQL/TiDB等数据进行仓库化,用FlinkSQL计算,用Druid作为实时的MOLAP引擎。实时数仓和离线数仓在模型规范和约束上保持统一,提供一个统一的数据发布与订阅平台,将实时/离线结果数据暴露给上层应用。 数据访问层:我们的结果数据访问场景有传统报表/Dashboard展示类、多维分析类、常规检索查询以及海量实时查询等。 应用层:基于上述的架构,支撑商业化各业务线的数据需求,如报表平台、分析平台、A/B Test、用户画像、推荐等。
从数仓角度,无论实时还是离线,统一规划数仓模型,参考阿里One Data,定义ODS/DWD/DWT/DIM/DWA/APP等数据分层。数仓化管理已经是比较成熟的数据管理体系,约束数据分层的好处在此不再赘述。
既然实时也要跟离线一样进行数仓构建,在批流一体下的数仓流程应该是什么样呢?在我们的场景,
ODS:实时的ODS以Kafka为中心,因为我们的所有原始数据均会汇聚到Kafka。Kafka的原始数据通过Hamal落地HDFS/Hive即是离线数仓的ODS层。
DW层:实时数仓以Kafka作为主要的事实表,还需要整合HBase/AeroSpike/Redis/MySQL等,它们有场景下是维表,有场景下是源事实表,也可能是结果事实表,通过FlinkSQL进行Streaming ETL。而离线Hive数仓应用多年,有很成熟的构建体系和ETL流程。值得注意的是,在Flink在1.11版本对Hive的集成已经有了相对成熟的支持,这样通过Flink打通实时数仓和离线数仓,整个架构处理方便了很多。比如实时数仓中Kafka的表可以直接通过HiveTableSink落地到Hive,在Hive中进行离线ETL处理。实时数仓中的Kafka表也可以跟离线的Hive表进行Join。未来的DW层,实时数仓和离线数仓的界限会越来越模糊。
OLAP:Druid已经在我们生产环境中得到广泛的应用,作为一款实时MOLAP引擎,其实时摄入、低延迟查询、易扩展等特性,在我们的实时报表中发挥了很重要的作用。Presto我们用来加速Hive中APP应用数据的快速多维分析,其与Hive无缝结合,通过Alluxio缓存加速,报表分析可以在数秒内响应。TiDB作为HTAP的新贵,目前也在推广应用中,作为常规报表分析的引擎替换传统MySQL。
具体到如何基于FlinkSQL来构建实时数仓,其思路大概如下:Flink的Table API提供了对kafka/jdbc/hbase等实时开发涉及到的组件的支持,以kafka为例,将kafka topic抽象成Flink Table,如下:
CREATE TABLE flink_rtdw.demo.kafka_source_table (
topic STRING,
bidWord STRING,
planID STRING,
eventTime INTEGER,
procTime AS PROCTIME(),
ets AS TO_TIMESTAMP(FROM_UNIXTIME(eventTime)),
WATERMARK FOR ets AS ets - INTERVAL '1' MINUTE
) WITH (
'connector' = 'kafka',
'topic' = 'ba.join.shbt2.search-ocpc-click',
'properties.bootstrap.servers' = ‘Kafka-broker',
'properties.group.id' = 'testGroup',
'scan.startup.mode' = 'latest-offset',
'format' = 'json'
);
CREATE TABLE flink_rtdw.demo.kafka_sink_table (
window_time BIGINT,
topic STRING,
bid_word_count BIGINT
) WITH (
'connector' = 'kafka',
'topic' = 'ultron.demo.shbt2.into.shbt2.tumlewindow.dev',
'properties.bootstrap.servers' = ‘kafka-broker',
'format' = 'json'
);
这样我们就和Hive Table一样对Kafka topic资产进行数仓化建模,数仓ETL作业开发上也可以用SQL+UDF实现大部分逻辑,如下示例:
INSERT INTO
flink_rtdw.demo.kafka_sink_table
SELECT
UNIX_TIMESTAMP(
DATE_FORMAT(
TUMBLE_START(procTime, INTERVAL '1' MINUTE),
'yyyy-MM-dd HH:mm:ss'
)
) * 1000 as window_time,
topic,
COUNT(bidWord)
FROM
flink_rtdw.demo.kafka_source_table
GROUP BY
TUMBLE(procTime, INTERVAL '1' MINUTE),
topic;
同时基于数仓的数据管理如权限、血缘、质量等可以做到跟离线一致。
在这个架构下,无论实时数仓还是离线数仓,可以做到一切数据资产均视为表,一切ETL Job SQL化,在数据开发流程上做到实时离线统一,用户体验一致。我们希望开发一套全新的平台,来将以上流程进行整合,并进行数仓数据的统一治理。
得益于FlinkSQL 的快速发展以及Flink社区的强大,我们在去年年底确定了方案后开始了Ultron 2.0的平台化之路,经过几个月的努力,已经初步成型。
03 Ultron平台化之路
首先,Ultron 2.0与1.0不同,1.0的定位仅仅是提供Flink集群和作业的管理,是站在平台维护者的角度而设计的,为了更简单高效的进行集群部署和作业提交,并做到自动运维监控报警。而2.0的定位是实时数仓平台,是站在数据开发者的角度,不仅仅提供集群与作业的管理,还要关注数仓化建设、数据开发和数据治理。
Ultron 2.0 已经内部release一个版本,即将发布第二个,重构后的平台有了非常大的改进,我讲一一介绍其平台特性。
一站式平台服务
我们将开发流程梳理了下,全部整合到平台中,从项目构建,到集群部署,到作业开发,再到运维监控,这些环节都在平台完成,而且做了很多自动化的工作。对于开发者,只需要在页面上轻松配置即可完成。
如上图集群部署,我们将Flink集群部署的复杂命令和配置做了抽象和封装,用户只需要在页面上配置集群名、计算资源池(支持任意一个IDC的K8S/Mesos/Yarn集群)、版本、镜像,以及集群资源的配置(JM/TM的CPU/内存)等参数,即可按照步骤部署集群,部署后集群会自动收集Metrics监控,开启报警,整个过程非常简单。
集群以项目为粒度进行资源隔离,是为了做资源的配额控制,以方便做成本统计。例如,项目demo,其下部署的Flink集群,在K8S上是以项目名为粒度的namespace 并加了Quota限制,在Mesos上以role加Quota,在Yarn上以queue为粒度做Fair Scheduler上的Quota限制。
我们利用Docker镜像,支持任意Flink版本,用户只需要部署或升级时选择相应版本的镜像即可。
在作业开发上,与1.0的不同,我们提供JobJar和SQL两种开发模式。
对于JobJar,跟1.0的逻辑一样,仍然是将用户的Jar以REST API方式提交到集群运行和管理。 对于SQL方式,我们提供内置的SQL IDE开发环境,可以直接在平台开发。平台将SQL提交给Flink SQL Gateway编译成JobGraph后提交到集群。我们在ververica flink-sql-gateway(https://github.com/ververica/flink-sql-gateway)基础上做了很多的优化和扩展,来增强SQL Job的优化和平台对SQL Job的约束。
作业开发后,部署上线前,需要进行Job审核。即每一次作业变更,都需要由项目负责人进行审批,审批通过才允许上线。如果有问题,还可以回滚后重新上线发布。平台对每一个作业都有版本控制,都可以查看其历史版本,上线后发生问题时可以根据版本差异进行追溯定位。
上线后的metrics监控是自动收集的,我们将常用的监控指标画到Grafana 上,用户可以直接链接到其集群或作业的监控页面。
而集群或作业的报警是可以灵活配置的,项目内成员可以自助订阅其关心的集群或作业的报警,管理员也可以主动添加或删除报警组成员。我们的报警集成了公司内部的IM,支持消息报警和电话报警。
项目资产管理
在Ultron平台中,所有资产/作业均以项目为粒度进行归属,项目关联到部门,以进行成本核算和责任归属。数据资产归属到项目后,资产只能有一个Owner,来负责数据的产出或者数据维护,允许其他项目申请数据访问权限,由归属项目的负责人审批。
通过项目制权限管理,所有资产有了责任归属。数据开发是一个多人多项目协作的工作,责任清晰是为了保障数据的正确性,避免冲突。
Ultron平台对所有平台开发涉及到的资产进行统一管理,实时数仓涉及到的Kafka/HBase/MySQL/AeroSpike/Redis,离线数仓涉及到的Hive。以Kafka为例,我们对接每一个Kafka集群,约定用户在平台上统一申请创建和管理topic,申请producer/consumer权限。数据资产的元信息在平台中管理,如有哪些kafka topic,属于哪个kafka集群,是什么格式,有哪些项目有该topic的什么权限等等。有了这个基础,就可以打通数仓建模模块,对Kafka topic数仓化建模抽象成Flink Table。
项目成员可以灵活管理,项目内成员区分不同的角色,以协作分工开发。项目负责人负责管理项目内的成员和角色,用户也可以主动申请加入某项目,由项目负责人审批。如下图所示:
数仓化设计
前面提到过利用FlinkSQL构建数仓的思路,在平台实现上,我们提供统一的建模工具,在建表时,无论Flink Table还是Hive Table,无论实时还是离线,均约束数仓分层、主题域、权限、生命周期等。这一部分工作还在开发中。
对于数仓,站在数据开发者角度,我们划分为4个阶段:
数据建模:对某个主题下的数据需求进行表设计和创建 ETL Pipeline设计:根据需求,规划ETL 作业的流程 作业开发部署:具体开发ETL Pipeline中的各个作业并部署 数据服务:最终的结果数据通过OLAP对外提供服务
在数据建模方面,我们的建表工具会由用户输入自动生成Flink Table DDL语句,并持久化到hive metastore,由hive metastore作为实时数仓的元数据中心。
以Kafka为例,如上图所示,DDL由三部分组成:
schema + 时间属性:kafka topic我们支持json/csv/avro等格式,其schema我们需要在创建topic时由用户确定,同时对于json/avro我们还通过kafka schema registry来约束schema的前后兼容。时间属性(哪个字段作为时间戳,是处理时间还是事件事件,事件事件的watermark等)由用户建表时确定。 资产属性:该表底层的kafka topic、topic的格式以及topic所属的kafka集群的连接信息,在资产管理模块定义。 权限和自定义属性:如ConsumerGroup、起始offset以及自定义的consumer配置如request.timeout.ms、max.poll.records等高级配置,可以在建表时自定义,也可以在运行时通过Table Hint(详见https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/sql/hints.html)来覆盖。
在ETL Pipeline方面,其实并不复杂,以模型表为顶点,Flink ETL Job为边,确定输入和输出,即可构建Pipeline拓扑。需要注意的是,对于实时Streaming ETL和离线Batch ETL有不同:实时Streaming ETL相对简单,pipeline中的每个Job由于是Flink长服务,可以独立开发上线;而离线Batch ETL的整个pipeline需要作为整个flow一起交给调度系统调度,还需要考虑上下游数据的依赖。
在SQL任务开发上,我们提供Web的SQL IDE,用户只需要输入SQL逻辑即可。我们尽量简化各种复杂的配置,让用户更专注于逻辑开发。SQL IDE提供了该项目下可用表的元数据预览和搜索(可用的catalog/database/table,以及table的字段名、类型等信息),提供UDF注册与管理功能。在作业配置上,比如对于planner/时间特性/状态的配置、table的一些配置、source/sink表的并行度、checkpoint的配置、以及其他自定义配置我们都可作为运行参数交给用户选择。
实时数仓,我们以Druid作为OLAP数据服务的引擎,在没有数仓化平台之前,用户是裸写supervisor的json通过REST API提交,这个json非常复杂,很容易写错误操作,用户使用成本很高,也不利于平台维护。在Ultron数仓流程中,我们将Druid的supervisor数据摄入抽象成了Cube构建的逻辑。根据输入kafka topic的格式和schema(已提前注册),用户只需要选择时间和维度、指标定义并加以存储周期和计算运行的一些配置即可构建supervisor。数仓Cube流程化后,用户的操作大大简化。平台还提供了supervisor task和datasource的一些基础管理功能。
微服务架构
整个Ultron服务采用微服务架构,基于Spring Cloud + Feign + Eureka,部署在K8S上。对于平台服务,微服务 + 云原生是未来的架构趋势,在这样的架构下可以做到serverless、高容错、易扩展,当负载遇到瓶颈时,可以一键扩容。另外,Ultron设计之初就考虑是平台无关的,我们不仅仅局限于360内部的基础设施版本,尽量做到兼容社区。如我们在平台中可以注册任意版本的Kafka集群,只需要提供其版本以及连接认证信息即可,对Flink的多版本支持和任意K8S/Mesos/YARN也是同样的道理。剥离与各组件的版本的强耦合,有利于底层升级时不影响Ultron平台服务,也使得Ultron的支持范围更广。
04 未来规划
目前Ultron平台已经完成了部分工作,包括项目管理、Flink集群部署和作业管理、Flink任务开发和上线流程、Druid数据服务等。未来的平台化工作主要有以下三大方面:
平台集成:
目前Kafka/Druid已经集成完毕,支持Kafka Topic的创建、变更、Schema注册和Producer/Consumer权限的申请,支持Druid Cube建模和摄取任务管理。未来我们实时数仓开发涉及到的HBase/AeroSpike/MySQL/TiDB/ES等都需要在平台进行资产化管理。同时,跨源之间的异构数据传输也需要在平台中支持。任务开发上线方面,我们需要提供一个预发布的方案,允许用户在真正上线前进行预发布,提前校验数据。
数仓化建设:
通过数仓建模流程,完善实时数仓元数据管理,同时提供数据地图、数据血缘、数据质量等治理工具。
批流合一:
集成Hive/Presto,整合离线数仓建模与任务开发,做到实时数仓与离线数仓的统一。离线数仓需要集成已有的Azkaban调度,支持离线批处理任务的开发、部署、调度上线、监控等一系列流程。
作者简介
朱广彬,360数据架构资深专家,360商业化数据架构负责人,2016年加入360,负责商业化广告业务的大数据架构基础设施和数据平台建设。加入360之前,曾就职于美团点评(原大众点评)数据平台数据架构组。
感谢阅读,本次分享的内容就结束了。本公众号保持日更,每天08:16发文,为您提供优秀高质量的数据领域的分享。加群或投稿可加v:iom1128,备注:数据,谢谢!
2021-08-19
2021-08-18
2021-08-17
2021-08-16
2021-08-13
2021-08-12
2021-08-12
2021-08-11
2021-08-10
2021-08-09